package com.game.core.dbwrite.domain;

import java.util.ArrayList;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.log4j.Logger;
import org.rocksdb.RocksDB;

import com.game.core.dbwrite.constant.DBOperationType;
import com.game.core.dbwrite.util.KryoUtil;
import com.game.core.dbwrite.util.RocksDBUtil;
import com.game.core.dbwrite.util.WriteToDBUtil;

public class Storage
{
	private static final Logger logger = Logger.getLogger(Storage.class);
	private static ArrayList<StoreEntity> list1;
	private static ArrayList<StoreEntity> list2;
	private static String dbPath1 = "/home/lds/tempDB1";
	private static String dbPath2 = "/home/lds/tempDB2";
	private static final ReadWriteLock lock1 = new ReentrantReadWriteLock(false);
	private static final ReadWriteLock lock2 = new ReentrantReadWriteLock(false);
	private static boolean exchange = true;    // 交替向RocksDB中写数据标志

	private static RocksDB db1 = RocksDBUtil.openRocksDB(dbPath1);
	private static RocksDB db2 = RocksDBUtil.openRocksDB(dbPath2);

	public static void addToList(long beanNum, Class<?> clazz, DBOperationType type, byte[] b)
	{
		if (exchange)
		{
			lock1.readLock().lock();
			StoreEntity entity = new StoreEntity(clazz, type, b);
			RocksDBUtil.addKeyToRocksDB(db1, String.valueOf(beanNum).getBytes(), KryoUtil.serilize(StoreEntity.class, entity));
			lock1.readLock().unlock();
		}
		else
		{
			lock2.readLock().lock();
			StoreEntity entity = new StoreEntity(clazz, type, b);
			RocksDBUtil.addKeyToRocksDB(db2, String.valueOf(beanNum).getBytes(), KryoUtil.serilize(StoreEntity.class, entity));
			lock2.readLock().unlock();
		}
	}

	public static void addToList(long beanNum, StoreEntity entity)
	{
		if (exchange)
		{
			lock1.readLock().lock();
			RocksDBUtil.addKeyToRocksDB(db1, String.valueOf(beanNum).getBytes(), KryoUtil.serilize(StoreEntity.class, entity));
			logger.info("Add key to RocksDB " + dbPath1);
			lock1.readLock().unlock();
		}
		else
		{
			lock2.readLock().lock();
			RocksDBUtil.addKeyToRocksDB(db2, String.valueOf(beanNum).getBytes(), KryoUtil.serilize(StoreEntity.class, entity));
			logger.info("Add key to RocksDB " + dbPath2);
			lock2.readLock().unlock();
		}
	}

	public static boolean isRocksDBFinishedWriting()
	{
		return list1.isEmpty() && list2.isEmpty() && RocksDBUtil.isRocksDBEmpty(db1) && RocksDBUtil.isRocksDBEmpty(db2);
	}

	public static void writeToDatabase()
	{
		if (exchange)
		{
			lock1.writeLock().lock();
			exchange = false;
			list1 = RocksDBUtil.getKeysFromRocksDB(db1);
			for (StoreEntity entity : list1)
			{
				WriteToDBUtil.writeStoreEntityToDB(entity);
			}
			if (list1.size() > 0)
				logger.info("Get " + list1.size() + " key-value(s) from RocksDB " + dbPath1 + ".");
			list1.clear();
			lock1.writeLock().unlock();
		}
		else
		{
			lock2.writeLock().lock();
			exchange = true;
			list2 = RocksDBUtil.getKeysFromRocksDB(db2);
			for (StoreEntity entity : list2)
			{
				WriteToDBUtil.writeStoreEntityToDB(entity);
			}
			if (list2.size() > 0)
				logger.info("Get " + list2.size() + " key-value(s) from RocksDB " + dbPath2 + ".");
			list2.clear();
			lock2.writeLock().unlock();
		}
	}

	public static void closeRocksDB()
	{
		RocksDBUtil.closeRocksDB(db1);
		RocksDBUtil.closeRocksDB(db2);
	}

	@Override
	protected void finalize()
	{
		RocksDBUtil.closeRocksDB(db1);
		RocksDBUtil.closeRocksDB(db2);
	}
}